{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "384e5e16-7213-4186-9d04-09d03b155534",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# Feathr Quick Start Notebook\n",
    "\n",
    "This notebook illustrates the use of Feathr Feature Store to create a model that predicts NYC Taxi fares. The dataset comes from [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).\n",
    "\n",
    "The major problems Feathr solves are:\n",
    "\n",
    "1. Create, share and manage useful features from raw source data.\n",
    "2. Provide Point-in-time feature join to create training dataset to ensure no data leakage.\n",
    "3. Deploy the same feature data to online store to eliminate training and inference data skew."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prerequisite\n",
    "\n",
    "Feathr has native cloud integration. First step is to provision required cloud resources if you want to use Feathr.\n",
    "\n",
    "Follow the [Feathr ARM deployment guide](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html) to run Feathr on Azure. This allows you to quickly get started with automated deployment using Azure Resource Manager template. For more details, please refer [README.md](https://github.com/feathr-ai/feathr#%EF%B8%8F-running-feathr-on-cloud-with-a-few-simple-steps).\n",
    "\n",
    "Additionally, to run this notebook, you'll need to install `feathr` pip package. For local spark, simply run `pip install feathr` on the machine that runs this notebook. To use Databricks or Azure Synapse Analytics, please see dependency management documents:\n",
    "- [Azure Databricks dependency management](https://learn.microsoft.com/en-us/azure/databricks/libraries/)\n",
    "- [Azure Synapse Analytics dependency management](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-azure-portal-add-libraries)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Notebook Steps\n",
    "\n",
    "This tutorial demonstrates the key capabilities of Feathr, including:\n",
    "\n",
    "1. Install Feathr and necessary dependencies\n",
    "2. Create shareable features with Feathr feature definition configs\n",
    "3. Create training data using point-in-time correct feature join\n",
    "4. Train a prediction model and evaluate the model and features\n",
    "5. Register the features to share across teams\n",
    "6. Materialize feature values for online scoring\n",
    "\n",
    "The overall data flow is as follows:\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/feathr-ai/feathr/main/docs/images/feature_flow.png\" width=\"800\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Install Feathr and Necessary Dependancies\n",
    "\n",
    "Install feathr and necessary packages by running one of following commends if you haven't installed them already:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# To install feathr from the latest codes in the repo:\n",
    "#%pip install \"git+https://github.com/feathr-ai/feathr.git#subdirectory=feathr_project&egg=feathr[notebook]\" \n",
    "\n",
    "# To install the latest release:\n",
    "#%pip install \"feathr[notebook]\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "80223a02-631c-40c8-91b3-a037249ffff9",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "from datetime import timedelta\n",
    "import os\n",
    "from pathlib import Path\n",
    "\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.ml.regression import GBTRegressor\n",
    "from pyspark.sql import DataFrame, SparkSession\n",
    "import pyspark.sql.functions as F\n",
    "\n",
    "import feathr\n",
    "from feathr import (\n",
    "    FeathrClient,\n",
    "    # Feature data types\n",
    "    BOOLEAN, FLOAT, INT32, ValueType,\n",
    "    # Feature data sources\n",
    "    INPUT_CONTEXT, HdfsSource,\n",
    "    # Feature aggregations\n",
    "    TypedKey, WindowAggTransformation,\n",
    "    # Feature types and anchor\n",
    "    DerivedFeature, Feature, FeatureAnchor,\n",
    "    # Materialization\n",
    "    BackfillTime, MaterializationSettings, RedisSink,\n",
    "    # Offline feature computation\n",
    "    FeatureQuery, ObservationSettings,\n",
    ")\n",
    "from feathr.datasets import nyc_taxi\n",
    "from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration\n",
    "from feathr.utils.config import generate_config\n",
    "from feathr.utils.job_utils import get_result_df\n",
    "from feathr.utils.platform import is_databricks, is_jupyter\n",
    "\n",
    "print(f\"Feathr version: {feathr.__version__}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Create Shareable Features with Feathr Feature Definition Configs\n",
    "\n",
    "First, we define all the necessary resource key values for authentication. These values are retrieved by using [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/) cloud key value store. For authentication, we use Azure CLI credential in this notebook, but you may add secrets' list and get permission for the necessary service principal instead of running `az login --use-device-code`.\n",
    "\n",
    "Please refer to [A note on using azure key vault to store credentials](https://github.com/feathr-ai/feathr/blob/41e7496b38c43af6d7f8f1de842f657b27840f6d/docs/how-to-guides/feathr-configuration-and-env.md#a-note-on-using-azure-key-vault-to-store-credentials) for more details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "RESOURCE_PREFIX = None  # TODO fill the value used to deploy the resources via ARM template\n",
    "PROJECT_NAME = \"nyc_taxi\"\n",
    "\n",
    "# Currently support: 'azure_synapse', 'databricks', and 'local' \n",
    "SPARK_CLUSTER = \"local\"\n",
    "\n",
    "# TODO fill values to use databricks cluster:\n",
    "DATABRICKS_CLUSTER_ID = None             # Set Databricks cluster id to use an existing cluster\n",
    "if is_databricks():\n",
    "    # If this notebook is running on Databricks, its context can be used to retrieve token and instance URL\n",
    "    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()\n",
    "    DATABRICKS_WORKSPACE_TOKEN_VALUE = ctx.apiToken().get()\n",
    "    SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f\"https://{ctx.tags().get('browserHostName').get()}\"\n",
    "else:\n",
    "    DATABRICKS_WORKSPACE_TOKEN_VALUE = None                  # Set Databricks workspace token to use databricks\n",
    "    SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = None  # Set Databricks workspace url to use databricks\n",
    "\n",
    "# TODO fill values to use Azure Synapse cluster:\n",
    "AZURE_SYNAPSE_SPARK_POOL = None  # Set Azure Synapse Spark pool name\n",
    "AZURE_SYNAPSE_URL = None         # Set Azure Synapse workspace url to use Azure Synapse\n",
    "ADLS_KEY = None                  # Set Azure Data Lake Storage key to use Azure Synapse\n",
    "\n",
    "# An existing Feathr config file path. If None, we'll generate a new config based on the constants in this cell.\n",
    "FEATHR_CONFIG_PATH = None\n",
    "\n",
    "# If set True, use an interactive browser authentication to get the redis password.\n",
    "USE_CLI_AUTH = False\n",
    "\n",
    "# If set True, register the features to Feathr registry.\n",
    "REGISTER_FEATURES = False\n",
    "\n",
    "# (For the notebook test pipeline) If true, use ScrapBook package to collect the results.\n",
    "SCRAP_RESULTS = False"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To use Databricks as the feathr client's target platform, you may need to set a databricks token to an environment variable like:\n",
    "\n",
    "`export DATABRICKS_WORKSPACE_TOKEN_VALUE=your-token`\n",
    "\n",
    "or in the notebook cell,\n",
    "\n",
    "`os.environ[\"DATABRICKS_WORKSPACE_TOKEN_VALUE\"] = your-token`\n",
    "\n",
    "If you are running this notebook on Databricks, the token will be automatically retrieved by using the current Databricks notebook context.\n",
    "\n",
    "On the other hand, to use Azure Synapse cluster, you have to specify the synapse workspace storage key:\n",
    "\n",
    "`export ADLS_KEY=your-key`\n",
    "\n",
    "or in the notebook cell,\n",
    "\n",
    "`os.environ[\"ADLS_KEY\"] = your-key`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if SPARK_CLUSTER == \"azure_synapse\" and not os.environ.get(\"ADLS_KEY\"):\n",
    "    os.environ[\"ADLS_KEY\"] = ADLS_KEY\n",
    "elif SPARK_CLUSTER == \"databricks\" and not os.environ.get(\"DATABRICKS_WORKSPACE_TOKEN_VALUE\"):\n",
    "    os.environ[\"DATABRICKS_WORKSPACE_TOKEN_VALUE\"] = DATABRICKS_WORKSPACE_TOKEN_VALUE"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get an authentication credential to access Azure resources and register features\n",
    "if USE_CLI_AUTH:\n",
    "    # Use AZ CLI interactive browser authentication\n",
    "    !az login --use-device-code\n",
    "    from azure.identity import AzureCliCredential\n",
    "    credential = AzureCliCredential(additionally_allowed_tenants=['*'],)\n",
    "elif \"AZURE_TENANT_ID\" in os.environ and \"AZURE_CLIENT_ID\" in os.environ and \"AZURE_CLIENT_SECRET\" in os.environ:\n",
    "    # Use Environment variable secret\n",
    "    from azure.identity import EnvironmentCredential\n",
    "    credential = EnvironmentCredential()\n",
    "else:\n",
    "    # Try to use the default credential\n",
    "    from azure.identity import DefaultAzureCredential\n",
    "    credential = DefaultAzureCredential(\n",
    "        exclude_interactive_browser_credential=False,\n",
    "        additionally_allowed_tenants=['*'],\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Redis password\n",
    "if 'REDIS_PASSWORD' not in os.environ:\n",
    "    from azure.keyvault.secrets import SecretClient\n",
    "    vault_url = f\"https://{RESOURCE_PREFIX}kv.vault.azure.net\"\n",
    "    secret_client = SecretClient(vault_url=vault_url, credential=credential)\n",
    "    retrieved_secret = secret_client.get_secret('FEATHR-ONLINE-STORE-CONN').value\n",
    "    os.environ['REDIS_PASSWORD'] = retrieved_secret.split(\",\")[1].split(\"password=\", 1)[1]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "41d3648a-9bc9-40dc-90da-bc82b21ef9b3",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Configurations\n",
    "\n",
    "Feathr uses a yaml file to define configurations. Please refer to [feathr_config.yaml]( https://github.com//feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for the meaning of each field.\n",
    "\n",
    "All the Feathr configurations can be set to the yaml file via keyword arguments of `generate_config` helper function. Each keyword argument should be the concatenation of different layers of the config name using `__` as a separator.\n",
    "For example, if you want to specify a different value for the feature registry api endpoint, you can pass `        feature_registry__api_endpoint=\"YOUR-API-ENDPOINT-URL\"`.\n",
    "\n",
    "Note, a default value for the api endpoint will be set based on `RESOURCE_PREFIX`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "8cd64e3a-376c-48e6-ba41-5197f3591d48",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "if FEATHR_CONFIG_PATH:\n",
    "    config_path = FEATHR_CONFIG_PATH\n",
    "else:\n",
    "    config_path = generate_config(\n",
    "        resource_prefix=RESOURCE_PREFIX,\n",
    "        project_name=PROJECT_NAME,\n",
    "        spark_config__spark_cluster=SPARK_CLUSTER,\n",
    "        spark_config__azure_synapse__dev_url=AZURE_SYNAPSE_URL,\n",
    "        spark_config__azure_synapse__pool_name=AZURE_SYNAPSE_SPARK_POOL,\n",
    "        spark_config__databricks__workspace_instance_url=SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL,\n",
    "        databricks_cluster_id=DATABRICKS_CLUSTER_ID,\n",
    "    )\n",
    "\n",
    "with open(config_path, 'r') as f: \n",
    "    print(f.read())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of the config file, same as how you may pass the keyword arguments to `generate_config` utility function.\n",
    "\n",
    "For example, `feathr_runtime_location` for databricks config can be overwritten by setting `spark_config__databricks__feathr_runtime_location` environment variable."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3fef7f2f-df19-4f53-90a5-ff7999ed983d",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Initialize Feathr client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "9713a2df-c7b2-4562-88b0-b7acce3cc43a",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "client = FeathrClient(config_path=config_path, credential=credential)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "c3b64bda-d42c-4a64-b976-0fb604cf38c5",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Prepare the NYC taxi fare dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# If the notebook is runnong on Jupyter, start a spark session:\n",
    "if is_jupyter():\n",
    "    spark = (\n",
    "        SparkSession\n",
    "        .builder\n",
    "        .appName(\"feathr\")\n",
    "        .config(\"spark.jars.packages\", \"org.apache.spark:spark-avro_2.12:3.3.0,io.delta:delta-core_2.12:2.1.1\")\n",
    "        .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n",
    "        .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n",
    "        .config(\"spark.ui.port\", \"8080\")  # Set ui port other than the default one (4040) so that feathr spark job doesn't fail. \n",
    "        .getOrCreate()\n",
    "    )\n",
    "\n",
    "# Else, you must already have a spark session object available in databricks or synapse notebooks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use dbfs if the notebook is running on Databricks\n",
    "if is_databricks():\n",
    "    WORKING_DIR = f\"/dbfs/{PROJECT_NAME}\"\n",
    "else:\n",
    "    WORKING_DIR = PROJECT_NAME"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "c4ccd7b3-298a-4e5a-8eec-b7e309db393e",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# Download the data file\n",
    "data_file_path = f\"{WORKING_DIR}/nyc_taxi_data.csv\"\n",
    "df_raw = nyc_taxi.get_spark_df(spark=spark, local_cache_path=data_file_path)\n",
    "df_raw.limit(5).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7430c942-64e5-4b70-b823-16ce1d1b3cee",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Defining features with Feathr\n",
    "\n",
    "In Feathr, a feature is viewed as a function, mapping a key and timestamp to a feature value. For more details, please see [Feathr Feature Definition Guide](https://github.com/feathr-ai/feathr/blob/main/docs/concepts/feature-definition.md).\n",
    "\n",
    "* The feature key (a.k.a. entity id) identifies the subject of feature, e.g. a user_id or location_id.\n",
    "* The feature name is the aspect of the entity that the feature is indicating, e.g. the age of the user.\n",
    "* The feature value is the actual value of that aspect at a particular time, e.g. the value is 30 at year 2022.\n",
    "\n",
    "Note that, in some cases, a feature could be just a transformation function that has no entity key or timestamp involved, e.g. *the day of week of the request timestamp*.\n",
    "\n",
    "There are two types of features -- anchored features and derivated features:\n",
    "\n",
    "* **Anchored features**: Features that are directly extracted from sources. Could be with or without aggregation. \n",
    "* **Derived features**: Features that are computed on top of other features.\n",
    "\n",
    "#### Define anchored features\n",
    "\n",
    "A feature source is needed for anchored features that describes the raw data in which the feature values are computed from. A source value should be either `INPUT_CONTEXT` (the features that will be extracted from the observation data directly) or `feathr.source.Source` object."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "TIMESTAMP_COL = \"lpep_dropoff_datetime\"\n",
    "TIMESTAMP_FORMAT = \"yyyy-MM-dd HH:mm:ss\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a373ecbe-a040-4cd3-9d87-0d5f4c5ba553",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# We define f_trip_distance and f_trip_time_duration features separately\n",
    "# so that we can reuse them later for the derived features.\n",
    "f_trip_distance = Feature(\n",
    "    name=\"f_trip_distance\",\n",
    "    feature_type=FLOAT,\n",
    "    transform=\"trip_distance\",\n",
    ")\n",
    "f_trip_time_duration = Feature(\n",
    "    name=\"f_trip_time_duration\",\n",
    "    feature_type=FLOAT,\n",
    "    transform=\"cast_float((to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime)) / 60)\",\n",
    ")\n",
    "\n",
    "features = [\n",
    "    f_trip_distance,\n",
    "    f_trip_time_duration,\n",
    "    Feature(\n",
    "        name=\"f_is_long_trip_distance\",\n",
    "        feature_type=BOOLEAN,\n",
    "        transform=\"trip_distance > 30.0\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"f_day_of_week\",\n",
    "        feature_type=INT32,\n",
    "        transform=\"dayofweek(lpep_dropoff_datetime)\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"f_day_of_month\",\n",
    "        feature_type=INT32,\n",
    "        transform=\"dayofmonth(lpep_dropoff_datetime)\",\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"f_hour_of_day\",\n",
    "        feature_type=INT32,\n",
    "        transform=\"hour(lpep_dropoff_datetime)\",\n",
    "    ),\n",
    "]\n",
    "\n",
    "# After you have defined features, bring them together to build the anchor to the source.\n",
    "feature_anchor = FeatureAnchor(\n",
    "    name=\"feature_anchor\",\n",
    "    source=INPUT_CONTEXT,  # Pass through source, i.e. observation data.\n",
    "    features=features,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can define the source with a preprocessing python function. In order to make the source data accessible from the target spark cluster, we upload the data file into either DBFS or Azure Blob Storage if needed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Upload files to cloud if needed\n",
    "if client.spark_runtime == \"local\":\n",
    "    # In local mode, we can use the same data path as the source.\n",
    "    data_source_path = data_file_path\n",
    "elif client.spark_runtime == \"databricks\" and is_databricks():\n",
    "    # If the notebook is running on databricks, we can use the same data path as the source.\n",
    "    data_source_path = data_file_path.replace(\"/dbfs\", \"dbfs:\", 1)\n",
    "else:\n",
    "    # Otherwise, upload the local file to the cloud storage (either dbfs or adls).\n",
    "    data_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(data_file_path)    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def preprocessing(df: DataFrame) -> DataFrame:\n",
    "    import pyspark.sql.functions as F\n",
    "    df = df.withColumn(\"fare_amount_cents\", (F.col(\"fare_amount\") * 100.0).cast(\"float\"))\n",
    "    return df\n",
    "\n",
    "batch_source = HdfsSource(\n",
    "    name=\"nycTaxiBatchSource\",\n",
    "    path=data_source_path,\n",
    "    event_timestamp_column=TIMESTAMP_COL,\n",
    "    timestamp_format=TIMESTAMP_FORMAT,\n",
    "    preprocessing=preprocessing,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For the features with aggregation, the supported functions are as follows:\n",
    "\n",
    "| Aggregation Function | Input Type | Description |\n",
    "| --- | --- | --- |\n",
    "|SUM, COUNT, MAX, MIN, AVG\t|Numeric|Applies the the numerical operation on the numeric inputs. |\n",
    "|MAX_POOLING, MIN_POOLING, AVG_POOLING\t| Numeric Vector | Applies the max/min/avg operation on a per entry bassis for a given a collection of numbers.|\n",
    "|LATEST| Any |Returns the latest not-null values from within the defined time window |"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "agg_key = TypedKey(\n",
    "    key_column=\"DOLocationID\",\n",
    "    key_column_type=ValueType.INT32,\n",
    "    description=\"location id in NYC\",\n",
    "    full_name=\"nyc_taxi.location_id\",\n",
    ")\n",
    "\n",
    "agg_window = \"90d\"\n",
    "\n",
    "# Anchored features with aggregations\n",
    "agg_features = [\n",
    "    Feature(\n",
    "        name=\"f_location_avg_fare\",\n",
    "        key=agg_key,\n",
    "        feature_type=FLOAT,\n",
    "        transform=WindowAggTransformation(\n",
    "            agg_expr=\"fare_amount_cents\",\n",
    "            agg_func=\"AVG\",\n",
    "            window=agg_window,\n",
    "        ),\n",
    "    ),\n",
    "    Feature(\n",
    "        name=\"f_location_max_fare\",\n",
    "        key=agg_key,\n",
    "        feature_type=FLOAT,\n",
    "        transform=WindowAggTransformation(\n",
    "            agg_expr=\"fare_amount_cents\",\n",
    "            agg_func=\"MAX\",\n",
    "            window=agg_window,\n",
    "        ),\n",
    "    ),\n",
    "]\n",
    "\n",
    "agg_feature_anchor = FeatureAnchor(\n",
    "    name=\"agg_feature_anchor\",\n",
    "    source=batch_source,  # External data source for feature. Typically a data table.\n",
    "    features=agg_features,\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "d2ecaca9-057e-4b36-811f-320f66f753ed",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "#### Define derived features\n",
    "\n",
    "We also define a derived feature, `f_trip_speed`, from the anchored features `f_trip_distance` and `f_trip_time_duration` as follows:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "270fb11e-8a71-404f-9639-ad29d8e6a2c1",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "derived_features = [\n",
    "    DerivedFeature(\n",
    "        name=\"f_trip_speed\",\n",
    "        feature_type=FLOAT,\n",
    "        input_features=[\n",
    "            f_trip_distance,\n",
    "            f_trip_time_duration,\n",
    "        ],\n",
    "        transform=\"f_trip_distance / f_trip_time_duration\",\n",
    "    )\n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "ad102c45-586d-468c-85f0-9454401ef10b",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Build features\n",
    "\n",
    "Finally, we build the features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "91bb5ebb-87e4-470b-b8eb-1c89b351740e",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "client.build_features(\n",
    "    anchor_list=[feature_anchor, agg_feature_anchor],\n",
    "    derived_feature_list=derived_features,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "632d5f46-f9e2-41a8-aab7-34f75206e2aa",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 3. Create Training Data Using Point-in-Time Correct Feature Join\n",
    "\n",
    "After the feature producers have defined the features (as described in the Feature Definition part), the feature consumers may want to consume those features. Feature consumers will use observation data to query from different feature tables using Feature Query.\n",
    "\n",
    "To create a training dataset using Feathr, one needs to provide a feature join configuration file to specify\n",
    "what features and how these features should be joined to the observation data. \n",
    "\n",
    "To learn more on this topic, please refer to [Point-in-time Correctness](https://github.com//feathr-ai/feathr/blob/main/docs/concepts/point-in-time-join.md)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_names = [feature.name for feature in features + agg_features + derived_features]\n",
    "feature_names"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "DATA_FORMAT = \"parquet\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "e438e6d8-162e-4aa3-b3b3-9d1f3b0d2b7f",
     "showTitle": false,
     "title": ""
    },
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "# Features that we want to request. Can use a subset of features\n",
    "query = FeatureQuery(\n",
    "    feature_list=feature_names,\n",
    "    key=agg_key,\n",
    ")\n",
    "settings = ObservationSettings(\n",
    "    observation_path=data_source_path,\n",
    "    event_timestamp_column=TIMESTAMP_COL,\n",
    "    timestamp_format=TIMESTAMP_FORMAT,\n",
    ")\n",
    "client.get_offline_features(\n",
    "    observation_settings=settings,\n",
    "    feature_query=query,\n",
    "    # For more details, see https://feathr-ai.github.io/feathr/how-to-guides/feathr-job-configuration.html\n",
    "    execution_configurations=SparkExecutionConfiguration({\n",
    "        \"spark.feathr.outputFormat\": DATA_FORMAT,\n",
    "    }),\n",
    "    output_path=data_source_path.rpartition(\"/\")[0] + f\"/features.{DATA_FORMAT}\",\n",
    ")\n",
    "\n",
    "client.wait_job_to_finish(timeout_sec=5000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Show feature results\n",
    "df = get_result_df(\n",
    "    spark=spark,\n",
    "    client=client,\n",
    "    data_format=DATA_FORMAT,\n",
    ")\n",
    "df.select(feature_names).limit(5).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "dcbf17fc-7f79-4a65-a3af-9cffbd0b5d1f",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 4. Train a Prediction Model and Evaluate the Features\n",
    "\n",
    "After generating all the features, we train and evaluate a machine learning model to predict the NYC taxi fare prediction. In this example, we use Spark MLlib's [GBTRegressor](https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression).\n",
    "\n",
    "Note that designing features, training prediction models and evaluating them are an iterative process where the models' performance maybe used to modify the features as a part of the modeling process."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load Train and Test Data from the Offline Feature Values"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Train / test split\n",
    "train_df, test_df = (\n",
    "    df  # Dataframe that we generated from get_offline_features call.\n",
    "    .withColumn(\"label\", F.col(\"fare_amount\").cast(\"double\"))\n",
    "    .where(F.col(\"f_trip_time_duration\") > 0)\n",
    "    .fillna(0)\n",
    "    .randomSplit([0.8, 0.2])\n",
    ")\n",
    "\n",
    "print(f\"Num train samples: {train_df.count()}\")\n",
    "print(f\"Num test samples: {test_df.count()}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build a ML Pipeline\n",
    "\n",
    "Here, we use Spark ML Pipeline to aggregate feature vectors and feed them to the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Generate a feature vector column for SparkML\n",
    "vector_assembler = VectorAssembler(\n",
    "    inputCols=[x for x in df.columns if x in feature_names],\n",
    "    outputCol=\"features\",\n",
    ")\n",
    "\n",
    "# Define a model\n",
    "gbt = GBTRegressor(\n",
    "    featuresCol=\"features\",\n",
    "    maxIter=100,\n",
    "    maxDepth=5,\n",
    "    maxBins=16,\n",
    ")\n",
    "\n",
    "# Create a ML pipeline\n",
    "ml_pipeline = Pipeline(stages=[\n",
    "    vector_assembler,\n",
    "    gbt,\n",
    "])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Train and Evaluate the Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Train a model\n",
    "model = ml_pipeline.fit(train_df)\n",
    "\n",
    "# Make predictions\n",
    "predictions = model.transform(test_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Evaluate\n",
    "evaluator = RegressionEvaluator(\n",
    "    labelCol=\"label\",\n",
    "    predictionCol=\"prediction\",\n",
    ")\n",
    "\n",
    "rmse = evaluator.evaluate(predictions, {evaluator.metricName: \"rmse\"})\n",
    "mae = evaluator.evaluate(predictions, {evaluator.metricName: \"mae\"})\n",
    "print(f\"RMSE: {rmse}\\nMAE: {mae}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# predicted fare vs actual fare plots -- will this work for databricks / synapse / local ?\n",
    "predictions_pdf = None\n",
    "try:\n",
    "    predictions_pdf = predictions.select([\"label\", \"prediction\"]).toPandas().reset_index()\n",
    "\n",
    "    predictions_pdf.plot(\n",
    "        x=\"index\",\n",
    "        y=[\"label\", \"prediction\"],\n",
    "        style=['-', ':'],\n",
    "        figsize=(20, 10),\n",
    "    )\n",
    "except NameError:\n",
    "    print(\"Inconsistent NumPy version detected. Skipping plotting.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if predictions_pdf is not None:\n",
    "    predictions_pdf.plot.scatter(\n",
    "        x=\"label\",\n",
    "        y=\"prediction\",\n",
    "        xlim=(0, 100),\n",
    "        ylim=(0, 100),\n",
    "        figsize=(10, 10),\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Register the Features to Share Across Teams\n",
    "\n",
    "You can register your features in the centralized registry and share the corresponding project with other team members who want to consume those features and for further use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if REGISTER_FEATURES:\n",
    "    try:\n",
    "        client.register_features()\n",
    "    except Exception as e:\n",
    "        print(e)  \n",
    "    print(client.list_registered_features(project_name=PROJECT_NAME))\n",
    "    # You can get the actual features too by calling client.get_features_from_registry(PROJECT_NAME)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "5a226026-1c7b-48db-8f91-88d5c2ddf023",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 6. Materialize Feature Values for Online Scoring\n",
    "\n",
    "While we computed feature values on-the-fly at request time via Feathr, we can pre-compute the feature values and materialize them to offline or online storages such as Redis.\n",
    "\n",
    "Note, only the features anchored to offline data source can be materialized."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get the last date from the dataset\n",
    "backfill_timestamp = (\n",
    "    df_raw\n",
    "    .select(F.to_timestamp(F.col(TIMESTAMP_COL), TIMESTAMP_FORMAT).alias(TIMESTAMP_COL))\n",
    "    .agg({TIMESTAMP_COL: \"max\"})\n",
    "    .collect()[0][0]\n",
    ")\n",
    "backfill_timestamp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3b924c66-8634-42fe-90f3-c844487d3f75",
     "showTitle": false,
     "title": ""
    },
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "FEATURE_TABLE_NAME = \"nycTaxiDemoFeature\"\n",
    "\n",
    "# Time range to materialize\n",
    "backfill_time = BackfillTime(\n",
    "    start=backfill_timestamp,\n",
    "    end=backfill_timestamp,\n",
    "    step=timedelta(days=1),\n",
    ")\n",
    "\n",
    "# Destinations:\n",
    "# For online store,\n",
    "redis_sink = RedisSink(table_name=FEATURE_TABLE_NAME)\n",
    "\n",
    "# For offline store,\n",
    "# adls_sink = HdfsSink(output_path=)\n",
    "\n",
    "settings = MaterializationSettings(\n",
    "    name=FEATURE_TABLE_NAME + \".job\",  # job name\n",
    "    backfill_time=backfill_time,\n",
    "    sinks=[redis_sink],  # or adls_sink\n",
    "    feature_names=[feature.name for feature in agg_features],\n",
    ")\n",
    "\n",
    "client.materialize_features(\n",
    "    settings=settings,\n",
    "    execution_configurations={\"spark.feathr.outputFormat\": \"parquet\"},\n",
    ")\n",
    "\n",
    "client.wait_job_to_finish(timeout_sec=5000)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, you can retrieve features for online scoring as follows:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Note, to get a single key, you may use client.get_online_features instead\n",
    "materialized_feature_values = client.multi_get_online_features(\n",
    "    feature_table=FEATURE_TABLE_NAME,\n",
    "    keys=[\"239\", \"265\"],\n",
    "    feature_names=[feature.name for feature in agg_features],\n",
    ")\n",
    "materialized_feature_values"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Cleanup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO: Unregister, delete cached files or do any other cleanups."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Stop the spark session if it is a local session.\n",
    "if is_jupyter():\n",
    "    spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Cleaning up the output files. CAUTION: this maybe dangerous if you \"reused\" the project name.\n",
    "import shutil\n",
    "shutil.rmtree(WORKING_DIR, ignore_errors=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Scrap Variables for Unit-Test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if SCRAP_RESULTS:\n",
    "    # Record results for test pipelines\n",
    "    import scrapbook as sb\n",
    "    sb.glue(\"materialized_feature_values\", materialized_feature_values)\n",
    "    sb.glue(\"rmse\", rmse)\n",
    "    sb.glue(\"mae\", mae)"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {
    "pythonIndentUnit": 4
   },
   "notebookName": "nyc_driver_demo",
   "notebookOrigID": 930353059183053,
   "widgets": {}
  },
  "celltoolbar": "Tags",
  "kernelspec": {
   "display_name": "feathr",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.16"
  },
  "vscode": {
   "interpreter": {
    "hash": "ddb0e38f168d5afaa0b8ab4851ddd8c14364f1d087c15de6ff2ee5a559aec1f2"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
